package com.xiaoxin.executor.batch.job.BlogInfo;

import com.xiaoxin.executor.batch.listener.MyJobListener;
import com.xiaoxin.executor.batch.listener.MyReadListener;
import com.xiaoxin.executor.batch.listener.MyWriteListener;
import com.xiaoxin.executor.batch.processor.MyItemProcessorNew;
import com.xiaoxin.executor.batch.validator.MyBeanValidator;
import com.xiaoxin.executor.domain.BlogInfo;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Component
public class BlogInfoTaskNew {

    @Autowired
    private MyJobListener myJobListener;

    /**
     * 定义job
     *
     * @param jobs
     * @param stepNew
     * @return
     */
    @Bean
    public Job myJobNew(JobBuilderFactory jobs, Step stepNew) {
        return jobs.get("myJobNew")
                .incrementer(new RunIdIncrementer())
                .flow(stepNew)
                .end()
                .listener(myJobListener)
                .build();

    }

    @Bean
    public ItemProcessor<BlogInfo, BlogInfo> processorNew() {
        MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew();
        // 设置校验器
        csvItemProcessor.setValidator(new MyBeanValidator<BlogInfo>());
        return csvItemProcessor;
    }

    @Bean
    public Step stepNew(StepBuilderFactory stepBuilderFactory,
                        MyBatisCursorItemReader<BlogInfo> itemReaderNew,
                        ItemWriter<BlogInfo> writerNew,
                        ItemProcessor<BlogInfo, BlogInfo> processorNew) {
        return stepBuilderFactory
                .get("stepNew")
                .<BlogInfo, BlogInfo>chunk(65000) // Chunk的机制(即每次读取一条数据，再处理一条数据，累积到一定数量后再一次性交给writer进行写入操作)
                .reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10)
                .listener(new MyReadListener())
                .processor(processorNew)
                .writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2)
                .listener(new MyWriteListener())
                .build();

    }

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    /**
     * Spring Batch提供了一个特殊的bean scope类（StepScope:作为一个自定义的Spring bean scope）。
     * 这个step scope的作用是连接batches的各个steps。
     * 这个机制允许配置在Spring的beans当steps开始时才实例化并且允许你为这个step指定配置和参数。
     * @param authorId
     * @return
     */
    @Bean
    @StepScope
    public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) {

        System.out.println("开始查询数据库");

        MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();

        reader.setQueryId("com.amc.executor.mapper.BlogMapper.queryInfoById");

        reader.setSqlSessionFactory(sqlSessionFactory);
        Map<String, Object> map = new HashMap<>();
        map.put("authorId", Integer.valueOf(authorId));
        reader.setParameterValues(map);
        return reader;
    }

    /**
     * ItemWriter定义：指定datasource，设置批量插入sql语句，写入数据库
     *
     * @param masterDataSource
     * @return
     */
    @Bean
    @StepScope
    public ItemWriter<BlogInfo> writerNew(@Qualifier(value = "masterDataSource") DataSource masterDataSource) {
        // 使用jdbcBcatchItemWrite写数据到数据库中
        JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
        // 设置有参数的sql语句
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
        String sql = "insert into bloginfonew " + " (blogAuthor,blogUrl,blogTitle,blogItem) "
                + " values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
        writer.setSql(sql);
        writer.setDataSource(masterDataSource);
        return writer;
    }

}
